package org.codehaus.activemq.broker.impl;

import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import javax.jms.JMSException;
import javax.transaction.xa.XAException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.activemq.broker.Broker;
import org.codehaus.activemq.broker.BrokerClient;
import org.codehaus.activemq.broker.BrokerConnector;
import org.codehaus.activemq.broker.BrokerContainer;
import org.codehaus.activemq.message.ActiveMQMessage;
import org.codehaus.activemq.message.ActiveMQXid;
import org.codehaus.activemq.message.BrokerInfo;
import org.codehaus.activemq.message.ConnectionInfo;
import org.codehaus.activemq.message.ConsumerInfo;
import org.codehaus.activemq.message.DurableUnsubscribe;
import org.codehaus.activemq.message.MessageAck;
import org.codehaus.activemq.message.ProducerInfo;
import org.codehaus.activemq.message.SessionInfo;
import org.codehaus.activemq.message.WireFormat;
import org.codehaus.activemq.transport.TransportChannel;
import org.codehaus.activemq.transport.TransportChannelListener;
import org.codehaus.activemq.transport.TransportServerChannel;
import org.codehaus.activemq.transport.TransportServerChannelProvider;

public class BrokerConnectorImpl implements BrokerConnector, TransportChannelListener {
	private BrokerInfo brokerInfo;
	private TransportServerChannel serverChannel;
	private Log log;
	private BrokerContainer container;
	private Map clients = Collections.synchronizedMap(new HashMap());

	public BrokerConnectorImpl(BrokerContainer container, String bindAddress, WireFormat wireFormat)
			throws JMSException {
		this(container, createTransportServerChannel(wireFormat, bindAddress));
	}

	public BrokerConnectorImpl(BrokerContainer container, TransportServerChannel serverChannel) {
		assert (container != null);
		this.brokerInfo = new BrokerInfo();
		this.brokerInfo.setBrokerName(container.getBroker().getBrokerName());
		this.log = LogFactory.getLog(getClass().getName());
		this.serverChannel = serverChannel;
		this.container = container;
		this.container.addConnector(this);
		serverChannel.setTransportChannelListener(this);
	}

	public BrokerInfo getBrokerInfo() {
		return this.brokerInfo;
	}

	public int getBrokerCapacity() {
		return this.container.getBroker().getRoundedCapacity();
	}

	public TransportServerChannel getServerChannel() {
		return this.serverChannel;
	}

	public void start() throws JMSException {
		this.serverChannel.start();
		this.log.info("ActiveMQ connector started: " + this);
	}

	public void stop() throws JMSException {
		this.serverChannel.stop();
		this.container.removeConnector(this);
		this.log.info("ActiveMQ connector stopped: " + this);
	}

	public void registerClient(BrokerClient client, ConnectionInfo info) throws JMSException {
		this.container.registerConnection(client, info);
	}

	public void deregisterClient(BrokerClient client, ConnectionInfo info) throws JMSException {
		this.container.deregisterConnection(client, info);
	}

	public void registerMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
		if (info.getDestination() == null) {
			throw new JMSException("No Destination specified on consumerInfo for client: " + client + " info: " + info);
		}
		this.container.registerMessageConsumer(client, info);
	}

	public void deregisterMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
		this.container.deregisterMessageConsumer(client, info);
	}

	public void registerMessageProducer(BrokerClient client, ProducerInfo info) throws JMSException {
		this.container.registerMessageProducer(client, info);
	}

	public void deregisterMessageProducer(BrokerClient client, ProducerInfo info) throws JMSException {
		this.container.deregisterMessageProducer(client, info);
	}

	public void registerSession(BrokerClient client, SessionInfo info) throws JMSException {
		this.container.registerSession(client, info);
	}

	public void deregisterSession(BrokerClient client, SessionInfo info) throws JMSException {
		this.container.deregisterSession(client, info);
	}

	public void startTransaction(BrokerClient client, String transactionId) throws JMSException {
		this.container.startTransaction(client, transactionId);
	}

	public void rollbackTransaction(BrokerClient client, String transactionId) throws JMSException {
		this.container.rollbackTransaction(client, transactionId);
	}

	public void commitTransaction(BrokerClient client, String transactionId) throws JMSException {
		this.container.commitTransaction(client, transactionId);
	}

	public void sendTransactedMessage(BrokerClient client, String transactionId, ActiveMQMessage message)
			throws JMSException {
		this.container.sendTransactedMessage(client, transactionId, message);
	}

	public void acknowledgeTransactedMessage(BrokerClient client, String transactionId, MessageAck ack)
			throws JMSException {
		this.container.acknowledgeTransactedMessage(client, transactionId, ack);
	}

	public void sendMessage(BrokerClient client, ActiveMQMessage message) throws JMSException {
		this.container.sendMessage(client, message);
	}

	public void acknowledgeMessage(BrokerClient client, MessageAck ack) throws JMSException {
		this.container.acknowledgeMessage(client, ack);
	}

	public void durableUnsubscribe(BrokerClient client, DurableUnsubscribe ds) throws JMSException {
		this.container.durableUnsubscribe(client, ds);
	}

	public void addClient(TransportChannel channel) {
		try {
			BrokerClient client = new BrokerClientImpl();
			client.initialize(this, channel);
			if (this.log.isDebugEnabled()) {
				this.log.debug("Starting new client: " + client);
			}
			channel.start();
			this.clients.put(channel, client);
		} catch (JMSException e) {
			this.log.error("Failed to add client due to: " + e, e);
		}
	}

	public void removeClient(TransportChannel channel) {
		BrokerClient client = (BrokerClient) this.clients.remove(channel);
		if (client != null) {
			if (this.log.isDebugEnabled()) {
				this.log.debug("Client leaving client: " + client);
			}

			client.cleanUp();
		} else {
			this.log.warn("No such client for channel: " + channel);
		}
	}

	public BrokerContainer getBrokerContainer() {
		return this.container;
	}

	protected static TransportServerChannel createTransportServerChannel(WireFormat wireFormat, String bindAddress)
			throws JMSException {
		URI url;
		try {
			url = new URI(bindAddress);
		} catch (URISyntaxException e) {

			JMSException jmsEx = new JMSException("Badly formated bindAddress: " + e.getMessage());
			jmsEx.setLinkedException(e);
			throw jmsEx;
		}
		return TransportServerChannelProvider.create(wireFormat, url);
	}

	public void startTransaction(BrokerClient client, ActiveMQXid xid) throws XAException {
		this.container.startTransaction(client, xid);
	}

	public ActiveMQXid[] getPreparedTransactions(BrokerClient client) throws XAException {
		return this.container.getPreparedTransactions(client);
	}

	public int prepareTransaction(BrokerClient client, ActiveMQXid xid) throws XAException {
		return this.container.prepareTransaction(client, xid);
	}

	public void rollbackTransaction(BrokerClient client, ActiveMQXid xid) throws XAException {
		this.container.rollbackTransaction(client, xid);
	}

	public void commitTransaction(BrokerClient client, ActiveMQXid xid, boolean onePhase) throws XAException {
		this.container.commitTransaction(client, xid, onePhase);
	}

	public String getResourceManagerId(BrokerClient client) {
		return getBrokerInfo().getBrokerName();
	}
}